package com.apobates.forum.utils.lang.stream;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;

/**
 *
 * @author xiaofanku
 */
public class ForkingStreamConsumer<T> implements Consumer<T>, ForkedStreamResults {
    // Object element marking the end of the stream
    static final Object SENTINAL = new Object();
    private final List<LinkedBlockingQueue<T>> queues;
    private final Map<Object, CompletableFuture<?>> actions;
    
    ForkingStreamConsumer(List<LinkedBlockingQueue<T>> queues, Map<Object, CompletableFuture<?>> actions) {
        this.queues = queues;
        this.actions = actions;
    }
    
    @Override
    public void accept(T t) {
        // @@@ Buffering issues, can barf if queue is full
        // i.e. producer is faster than consumer
        queues.forEach(q -> q.add(t));
    }
    
    @Override
    public <R> R get(Object key) {
        try{
            return ((CompletableFuture<R>) actions.get(key)).get();
        }catch(Exception e){
            throw new RuntimeException(e);
        }
    }
    
    @SuppressWarnings("unchecked")
    void finish() {
        accept((T) SENTINAL);
    }
}